/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.schema.transform;

import static org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_TABLE_TIMESTAMP;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.schema.ColumnMetaDataOps.addColumnMutation;
import static org.apache.phoenix.schema.MetaDataClient.UPDATE_INDEX_STATE_TO_ACTIVE;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
import static org.apache.phoenix.schema.PTableType.INDEX;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.TableInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TableViewFinderResult;
import org.apache.phoenix.util.UpgradeUtil;
import org.apache.phoenix.util.ViewUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.base.Strings;

public class Transform extends TransformClient {
  private static final Logger LOGGER = LoggerFactory.getLogger(Transform.class);

  public static PTable getTransformingNewTable(PhoenixConnection connection, PTable oldTable)
    throws SQLException {
    SystemTransformRecord transformRecord =
      TransformClient.getTransformRecord(connection, oldTable.getType(), oldTable.getSchemaName(),
        oldTable.getTableName(), oldTable.getType() == INDEX ? oldTable.getParentTableName() : null,
        oldTable.getTenantId(), oldTable.getBaseTableLogicalName());

    PTable transformingNewTable = null;
    if (transformRecord != null && transformRecord.isActive()) {
      // New table will behave like an index
      PName newTableNameWithoutSchema = PNameFactory
        .newName(SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName()));
      if (!newTableNameWithoutSchema.equals(oldTable.getPhysicalName(true))) {
        transformingNewTable =
          connection.getTableNoCache(transformRecord.getNewPhysicalTableName());
      }
    }
    return transformingNewTable;
  }

  public static void updateNewTableState(PhoenixConnection connection,
    SystemTransformRecord systemTransformRecord, PIndexState state) throws SQLException {
    String schema =
      SchemaUtil.getSchemaNameFromFullName(systemTransformRecord.getNewPhysicalTableName());
    String tableName =
      SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName());
    try (
      PreparedStatement tableUpsert = connection.prepareStatement(UPDATE_INDEX_STATE_TO_ACTIVE)) {
      tableUpsert.setString(1,
        systemTransformRecord.getTenantId() == null ? null : systemTransformRecord.getTenantId());
      tableUpsert.setString(2, schema);
      tableUpsert.setString(3, tableName);
      tableUpsert.setString(4, state.getSerializedValue());
      tableUpsert.setLong(5, 0);
      tableUpsert.setLong(6, 0);
      tableUpsert.execute();
    }
    // Update cache
    UpgradeUtil.clearCache(connection, connection.getTenantId(), schema, tableName,
      systemTransformRecord.getLogicalParentName(), MIN_TABLE_TIMESTAMP);
  }

  public static void removeTransformRecord(SystemTransformRecord transformRecord,
    PhoenixConnection connection) throws SQLException {
    connection
      .prepareStatement("DELETE FROM  " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + " WHERE "
        + (Strings.isNullOrEmpty(transformRecord.getSchemaName())
          ? ""
          : (PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + transformRecord.getSchemaName()
            + "' AND "))
        + PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" + transformRecord.getLogicalTableName()
        + "' AND " + PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + " ='"
        + transformRecord.getNewPhysicalTableName() + "' AND "
        + PhoenixDatabaseMetaData.TRANSFORM_TYPE + " ="
        + transformRecord.getTransformType().getSerializedValue())
      .execute();
  }

  /**
   * Disable caching re-design if you use Online Data Format Change since the cutover logic is
   * currently incompatible and clients may not learn about the physical table change. See
   * https://issues.apache.org/jira/browse/PHOENIX-6883 and
   * https://issues.apache.org/jira/browse/PHOENIX-7284.
   */
  public static void doCutover(PhoenixConnection connection,
    SystemTransformRecord systemTransformRecord) throws Exception {
    String tenantId = systemTransformRecord.getTenantId();
    String schema = systemTransformRecord.getSchemaName();
    String tableName = systemTransformRecord.getLogicalTableName();
    String newTableName =
      SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName());

    // Calculate changed metadata
    List<String> columnNames = new ArrayList<>();
    List<String> columnValues = new ArrayList<>();

    getMetadataDifference(connection, systemTransformRecord, columnNames, columnValues);
    // TODO In the future, we need to handle rowkey changes and column type changes as well

    String changeViewStmt =
      "UPSERT INTO SYSTEM.CATALOG " + "(TENANT_ID, TABLE_SCHEM, TABLE_NAME %s) VALUES (?, ?, ? %s)";

    String changeTable = String.format("UPSERT INTO SYSTEM.CATALOG "
      + "(TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHYSICAL_TABLE_NAME %s ) " + "VALUES(?, ?, ?, ? %s)",
      columnNames.size() > 0 ? "," + String.join(",", columnNames) : "",
      columnNames.size() > 0 ? "," + QueryUtil.generateInListParams(columnValues.size()) : "");

    LOGGER.info("About to do cutover via " + changeTable);
    TableViewFinderResult childViewsResult =
      ViewUtil.findChildViews(connection, tenantId, schema, tableName);
    boolean wasCommit = connection.getAutoCommit();
    connection.setAutoCommit(false);
    List<TableInfo> viewsToUpdateCache = new ArrayList<>();
    try {
      try (PreparedStatement stmt = connection.prepareStatement(changeTable)) {
        int param = 0;
        if (tenantId == null) {
          stmt.setNull(++param, Types.VARCHAR);
        } else {
          stmt.setString(++param, tenantId);
        }
        if (schema == null) {
          stmt.setNull(++param, Types.VARCHAR);
        } else {
          stmt.setString(++param, schema);
        }
        stmt.setString(++param, tableName);
        stmt.setString(++param, newTableName);
        for (int i = 0; i < columnValues.size(); i++) {
          stmt.setInt(++param, Integer.parseInt(columnValues.get(i)));
        }
        stmt.execute();
      }
      // Update column qualifiers
      PTable pNewTable = connection.getTable(systemTransformRecord.getNewPhysicalTableName());
      PTable pOldTable = connection.getTable(SchemaUtil.getTableName(schema, tableName));
      if (
        pOldTable.getImmutableStorageScheme() != pNewTable.getImmutableStorageScheme()
          || pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()
      ) {
        MetaDataClient.mutateTransformProperties(connection, tenantId, schema, tableName,
          newTableName, pNewTable.getImmutableStorageScheme(), pNewTable.getEncodingScheme());
        // We need to update the columns's qualifiers as well
        mutateColumns(connection.unwrap(PhoenixConnection.class), pOldTable, pNewTable);

        HashMap<String, PColumn> columnMap = new HashMap<>();
        for (PColumn column : pNewTable.getColumns()) {
          columnMap.put(column.getName().getString(), column);
        }

        // Also update view column qualifiers
        for (TableInfo view : childViewsResult.getLinks()) {
          PTable pView = connection.getTable(
            view.getTenantId() == null ? null : Bytes.toString(view.getTenantId()),
            SchemaUtil.getTableName(view.getSchemaName(), view.getTableName()));
          mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable,
            columnMap);
        }
      }
      connection.commit();

      // We can have millions of views. We need to send it in batches
      int maxBatchSize = connection.getQueryServices().getConfiguration()
        .getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
      int batchSize = 0;
      for (TableInfo view : childViewsResult.getLinks()) {
        String changeView = String.format(changeViewStmt,
          columnNames.size() > 0 ? "," + String.join(",", columnNames) : "",
          columnNames.size() > 0 ? "," + QueryUtil.generateInListParams(columnValues.size()) : "");
        LOGGER.info("Cutover changing view via " + changeView);
        try (PreparedStatement stmt = connection.prepareStatement(changeView)) {
          int param = 0;
          if (view.getTenantId() == null || view.getTenantId().length == 0) {
            stmt.setNull(++param, Types.VARCHAR);
          } else {
            stmt.setString(++param, Bytes.toString(view.getTenantId()));
          }
          if (view.getSchemaName() == null || view.getSchemaName().length == 0) {
            stmt.setNull(++param, Types.VARCHAR);
          } else {
            stmt.setString(++param, Bytes.toString(view.getSchemaName()));
          }
          stmt.setString(++param, Bytes.toString(view.getTableName()));
          for (int i = 0; i < columnValues.size(); i++) {
            stmt.setInt(++param, Integer.parseInt(columnValues.get(i)));
          }
          stmt.execute();
        }
        viewsToUpdateCache.add(view);
        batchSize++;
        if (batchSize >= maxBatchSize) {
          connection.commit();
          batchSize = 0;
        }
      }
      if (batchSize > 0) {
        connection.commit();
        batchSize = 0;
      }

      connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
      UpgradeUtil.clearCacheAndGetNewTable(connection.unwrap(PhoenixConnection.class),
        connection.getTenantId(), schema, tableName, systemTransformRecord.getLogicalParentName(),
        MIN_TABLE_TIMESTAMP);
      for (TableInfo view : viewsToUpdateCache) {
        UpgradeUtil.clearCache(connection.unwrap(PhoenixConnection.class),
          PNameFactory.newName(view.getTenantId()),
          PNameFactory.newName(view.getSchemaName()).getString(),
          Bytes.toString(view.getTableName()), tableName, MIN_TABLE_TIMESTAMP);
      }

      // TODO: Cleanup syscat so that we don't have an extra index
    } catch (Exception e) {
      LOGGER.error("Error happened during cutover ", e);
      connection.rollback();
      throw e;
    } finally {
      connection.setAutoCommit(wasCommit);
    }
  }

  private static void getMetadataDifference(PhoenixConnection connection,
    SystemTransformRecord systemTransformRecord, List<String> columnNames,
    List<String> columnValues) throws SQLException {
    PTable pOldTable =
      connection.getTable(SchemaUtil.getQualifiedTableName(systemTransformRecord.getSchemaName(),
        systemTransformRecord.getLogicalTableName()));
    PTable pNewTable = connection.getTable(SchemaUtil.getQualifiedTableName(
      SchemaUtil.getSchemaNameFromFullName(systemTransformRecord.getNewPhysicalTableName()),
      SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName())));

    Map<String, String> map = pOldTable.getPropertyValues();
    for (Map.Entry<String, String> entry : map.entrySet()) {
      String oldKey = entry.getKey();
      String oldValue = entry.getValue();
      if (pNewTable.getPropertyValues().containsKey(oldKey)) {
        if (PHYSICAL_TABLE_NAME.equals(oldKey)) {
          // No need to add here. We will add it.
          continue;
        }
        String newValue = pNewTable.getPropertyValues().get(oldKey);
        if (!Strings.nullToEmpty(oldValue).equals(Strings.nullToEmpty(newValue))) {
          columnNames.add(oldKey);
          // properties value that corresponds to a number will not need single quotes around it
          // properties value that corresponds to a boolean value will not need single quotes around
          // it
          if (!Strings.isNullOrEmpty(newValue)) {
            if (
              !(StringUtils.isNumeric(newValue))
                && !(newValue.equalsIgnoreCase(Boolean.TRUE.toString())
                  || newValue.equalsIgnoreCase(Boolean.FALSE.toString()))
            ) {
              if (ENCODING_SCHEME.equals(oldKey)) {
                newValue = String.valueOf(
                  PTable.QualifierEncodingScheme.valueOf(newValue).getSerializedMetadataValue());
              } else if (IMMUTABLE_STORAGE_SCHEME.equals(oldKey)) {
                newValue = String.valueOf(
                  PTable.ImmutableStorageScheme.valueOf(newValue).getSerializedMetadataValue());
              } else {
                newValue = "'" + newValue + "'";
              }
            }
          }
          columnValues.add(newValue);
        }
      }
    }
  }

  public static void completeTransform(Connection connection, Configuration configuration)
    throws Exception {
    // Will be called from Reducer
    String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
    String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
    String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
    String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
    String indexTableName = SchemaUtil
      .getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
    String logicaTableName = oldTableLogicalName;
    String logicalParentName = null;
    if (
      PhoenixConfigurationUtil.getTransformingTableType(configuration)
          == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE
    ) {
      if (!Strings.isNullOrEmpty(indexTableName)) {
        logicaTableName = indexTableName;
        logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
      }
    }

    SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName,
      logicalParentName, tenantId, connection.unwrap(PhoenixConnection.class));

    if (!PTable.TransformType.isPartialTransform(transformRecord.getTransformType())) {
      updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord,
        PTable.TransformStatus.PENDING_CUTOVER);
      connection.commit();
    } else {
      updateTransformRecord(connection.unwrap(PhoenixConnection.class), transformRecord,
        PTable.TransformStatus.COMPLETED);
      connection.commit();
    }
  }

  public static void updateTransformRecord(PhoenixConnection connection,
    SystemTransformRecord transformRecord, PTable.TransformStatus newStatus) throws SQLException {
    SystemTransformRecord.SystemTransformBuilder builder =
      new SystemTransformRecord.SystemTransformBuilder(transformRecord);
    builder.setTransformStatus(newStatus.name());
    builder.setLastStateTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()));
    if (newStatus == PTable.TransformStatus.STARTED) {
      builder.setTransformRetryCount(transformRecord.getTransformRetryCount() + 1);
    }
    Transform.upsertTransform(builder.build(), connection);
  }

  private static void mutateColumns(PhoenixConnection connection, PTable pOldTable,
    PTable pNewTable) throws SQLException {
    if (pOldTable.getEncodingScheme() != pNewTable.getEncodingScheme()) {
      Short nextKeySeq = 0;
      for (PColumn column : pNewTable.getColumns()) {
        boolean isPk = SchemaUtil.isPKColumn(column);
        Short keySeq = isPk ? ++nextKeySeq : null;
        PColumn newCol = new PColumnImpl(column.getName(), column.getFamilyName(),
          column.getDataType(), column.getMaxLength(), column.getScale(), column.isNullable(),
          column.getPosition(), column.getSortOrder(), column.getArraySize(),
          column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(),
          column.isRowTimestamp(), column.isDynamic(), column.getColumnQualifierBytes(),
          EnvironmentEdgeManager.currentTimeMillis());
        addColumnMutation(connection,
          pOldTable.getSchemaName() == null ? null : pOldTable.getSchemaName().getString(),
          pOldTable.getTableName().getString(), newCol,
          pNewTable.getParentTableName() == null
            ? null
            : pNewTable.getParentTableName().getString(),
          pNewTable.getPKName() == null ? null : pNewTable.getPKName().getString(), keySeq,
          pNewTable.getBucketNum() != null);
      }
    }
  }

  public static PTable getTransformedView(PTable pOldView, PTable pNewTable,
    HashMap<String, PColumn> columnMap, boolean withDerivedColumns) throws SQLException {
    List<PColumn> newColumns = new ArrayList<>();
    PTable pNewView = null;
    if (pOldView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
      Short nextKeySeq = 0;
      PTable.EncodedCQCounter cqCounterToUse = pNewTable.getEncodedCQCounter();
      String defaultColumnFamily = pNewTable.getDefaultFamilyName() != null
        && !Strings.isNullOrEmpty(pNewTable.getDefaultFamilyName().getString())
          ? pNewTable.getDefaultFamilyName().getString()
          : DEFAULT_COLUMN_FAMILY;

      for (PColumn column : pOldView.getColumns()) {
        boolean isPk = SchemaUtil.isPKColumn(column);
        Short keySeq = isPk ? ++nextKeySeq : null;
        if (isPk) {
          continue;
        }
        String familyName = null;
        if (pNewTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
          familyName = column.getFamilyName() != null
            ? column.getFamilyName().getString()
            : defaultColumnFamily;
        } else {
          familyName = defaultColumnFamily;
        }
        int encodedCQ = pOldView.isAppendOnlySchema()
          ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq)
          : cqCounterToUse.getNextQualifier(familyName);
        byte[] colQualifierBytes = EncodedColumnsUtil
          .getColumnQualifierBytes(column.getName().getString(), encodedCQ, pNewTable, isPk);
        if (columnMap.containsKey(column.getName().getString())) {
          colQualifierBytes = columnMap.get(column.getName().getString()).getColumnQualifierBytes();
        } else {
          if (!column.isDerived()) {
            cqCounterToUse.increment(familyName);
          }
        }

        if (!withDerivedColumns && column.isDerived()) {
          // Don't need to add/change derived columns
          continue;
        }

        PColumn newCol =
          new PColumnImpl(column.getName(), PNameFactory.newName(familyName), column.getDataType(),
            column.getMaxLength(), column.getScale(), column.isNullable(), column.getPosition(),
            column.getSortOrder(), column.getArraySize(), column.getViewConstant(),
            column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(),
            column.isDynamic(), colQualifierBytes, EnvironmentEdgeManager.currentTimeMillis());
        newColumns.add(newCol);
        if (!columnMap.containsKey(newCol.getName().getString())) {
          columnMap.put(newCol.getName().getString(), newCol);
        }
      }

      pNewView =
        PTableImpl.builderWithColumns(pOldView, newColumns)
          .setQualifierEncodingScheme(pNewTable.getEncodingScheme())
          .setImmutableStorageScheme(pNewTable.getImmutableStorageScheme())
          .setPhysicalNames(Collections.singletonList(SchemaUtil.getPhysicalHBaseTableName(
            pNewTable.getSchemaName(), pNewTable.getTableName(), pNewTable.isNamespaceMapped())))
          .build();
    } else {
      // Have to change this per transform type
    }
    return pNewView;
  }

  private static void mutateViewColumns(PhoenixConnection connection, PTable pView,
    PTable pNewTable, HashMap<String, PColumn> columnMap) throws SQLException {
    if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
      Short nextKeySeq = 0;
      PTable newView = getTransformedView(pView, pNewTable, columnMap, false);
      for (PColumn newCol : newView.getColumns()) {
        boolean isPk = SchemaUtil.isPKColumn(newCol);
        Short keySeq = isPk ? ++nextKeySeq : null;
        if (isPk) {
          continue;
        }
        String tenantId = pView.getTenantId() == null ? null : pView.getTenantId().getString();
        addColumnMutation(connection, tenantId,
          pView.getSchemaName() == null ? null : pView.getSchemaName().getString(),
          pView.getTableName().getString(), newCol,
          pView.getParentTableName() == null ? null : pView.getParentTableName().getString(),
          pView.getPKName() == null ? null : pView.getPKName().getString(), keySeq,
          pView.getBucketNum() != null);
      }
    }
  }

  public static void doForceCutover(Connection connection, Configuration configuration)
    throws Exception {
    PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
    // Will be called from Reducer
    String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
    String fullOldTableName = PhoenixConfigurationUtil.getInputTableName(configuration);
    String schemaName = SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
    String oldTableLogicalName = SchemaUtil.getTableNameFromFullName(fullOldTableName);
    String indexTableName = SchemaUtil
      .getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
    String logicaTableName = oldTableLogicalName;
    String logicalParentName = null;
    if (
      PhoenixConfigurationUtil.getTransformingTableType(configuration)
          == IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE
    ) if (!Strings.isNullOrEmpty(indexTableName)) {
      logicaTableName = indexTableName;
      logicalParentName = SchemaUtil.getTableName(schemaName, oldTableLogicalName);
    }

    SystemTransformRecord transformRecord = getTransformRecord(schemaName, logicaTableName,
      logicalParentName, tenantId, phoenixConnection);
    Transform.doCutover(phoenixConnection, transformRecord);
    updateTransformRecord(phoenixConnection, transformRecord, PTable.TransformStatus.COMPLETED);
    phoenixConnection.commit();
  }

}
